package com.qianjing.common.utils.kafka;

import com.alibaba.fastjson.JSONObject;
import com.qianjing.common.constant.KafkaConstants;
import com.qianjing.project.common.domain.DbRecall;
import com.qianjing.project.common.service.IDbRecallService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.text.SimpleDateFormat;
import java.util.List;
import java.util.Optional;

/**
 * @ClassName KafkaConsumerManager
 * @Description kafka-消费者类
 * @Created by zzymaster@163.com 2022/8/2 15:06
 * @Version V1.0
 */
@Component
public class KafkaConsumerManager {
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerManager.class);

    private static SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Autowired
    public RedisTemplate redisTemplate;

    @Autowired
    private IDbRecallService dbRecallService;


    /**
     * 单条的消费kafka消息
     * @author kevin
     * @param record : 消息记录
     * @param ack : ack回调确认
     * @return void :
     * @date 2021/8/3 15:14
     */
//    @KafkaListener(topics = KafkaConstants.TOPIC_TEST, topicPartitions = {
//            @TopicPartition(topic = KafkaConstants.TOPIC_TEST, partitions = {"0"}),
//    }, groupId = KafkaConstants.TOPIC_GROUP1,containerFactory = "oneFactory")
    public void topicTest(ConsumerRecord<String, String> record, Acknowledgment ack) {

        Optional<String> message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            log.info("topic_test 消费了： Topic:" + record.topic() + ",key:" + record.key() + ",Message:" + msg.toString());
            DbRecall dbRecall = JSONObject.parseObject(msg.toString(),DbRecall.class);
            int result;
            DbRecall oldInfo = dbRecallService.selectDbRecallById(dbRecall.getId());
            if (oldInfo==null){
                result = dbRecallService.insertDbRecall(dbRecall);
            } else {
                result = dbRecallService.updateDbRecall(oldInfo);
            }
            if (result>0) {
                log.info("消费成功了............................................");
                ack.acknowledge();//手动提交offset
            }
        }
    }

    /**
     * 批量的消费kafka消息，要配合containerFactory使用，配置的bean见batchFactory
     * @author kevin
     * @param records : 消息记录列表
     * @param ack : ack回调确认
     * @return void :
     * @date 2021/8/3 15:15
     */
    @Transactional(rollbackFor = Exception.class)
    @KafkaListener(topics = KafkaConstants.TOPIC_TEST, topicPartitions = {
            @TopicPartition(topic = KafkaConstants.TOPIC_TEST, partitions = {"0", "1", "2", "3", "4"}),
    }, groupId = KafkaConstants.TOPIC_GROUP2, containerFactory="batchFactory")
    public void topicTest2(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
        log.info("**********************************接收数量{}**************************************",records.size());
        try {
            for (ConsumerRecord<String, String> record : records) {
                Optional<String> message = Optional.ofNullable(record.value());
                if (message.isPresent()) {
                    Long current = System.currentTimeMillis();
                    log.info("**********************************kafka接收信息打印开始**************************************");
                    log.info("kafka接收信息："+'\t'+record.toString());
                    log.info("kafka数据："+'\t'+record.value());
                    log.info("分区："+ record.partition());
                    log.info("偏移量：" + record.offset());
                    log.info("报文时间：" + formatter.format(record.timestamp()));
                    log.info("系统时间：" + formatter.format(current));
                    log.info("**********************************kafka信息打印结束**************************************");
                    Object msg = message.get();
                    log.info("topic_test1 消费了： Topic:" + record.topic() + ",key:" + record.key() + ",Message:" + msg);
                    DbRecall dbRecall = JSONObject.parseObject(msg.toString(),DbRecall.class);
                    DbRecall oldInfo = dbRecallService.selectDbRecallById(dbRecall.getId());
                    if (oldInfo==null){
                        dbRecallService.insertDbRecall(dbRecall);
                    } else {
                        dbRecallService.updateDbRecall(oldInfo);
                    }
                }
            }
            ack.acknowledge();//手动提交offset
            log.info("消费成功了............................................");

        }catch (Exception e){
            // TODO: handle exception
            log.error("********kafka接收数据出错:{}********",e.getMessage());
            throw e;
        }
    }
}
